package cn.rslee.scala.demos

import org.apache.spark.rdd.RDD
import org.apache.spark.{ SparkConf, SparkContext }

/**
 * 统计每小时搜索次数
 */
/*
搜狗日志示例
访问时间(时：分：秒)    用户ID                [查询词]        该URL在返回结果中的排名    用户点击的顺序号    用户点击的URL
00:00:00    2982199073774412    [360安全卫士]    8 3    download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html
00:00:00    07594220010824798    [哄抢救灾物资]    1 1    news.21cn.com/social/daqian/2008/05/29/4777194_1.shtml
00:00:00    5228056822071097    [75810部队]    14 5    www.greatoo.com/greatoo_cn/list.asp?link_id=276&title=%BE%DE%C2%D6%D0%C2%CE%C5
00:00:00    6140463203615646    [绳艺]    62 36    www.jd-cd.com/jd_opus/xx/200607/706.html
*/
object CountByHours {
  def main(args: Array[String]): Unit = {

    //1、启动spark上下文、读取文件
    val conf = new SparkConf().setAppName("sougo count by hours").setMaster("local")
    val sc = new SparkContext(conf)
    // var orgRdd = sc.textFile("C:\\Users\\KING\\Desktop\\SogouQ.reduced\\SogouQ.reduced")
    var orgRdd = sc.textFile("SogouQ.reduced");
    println("总行数：" + orgRdd.count())

    //2、map操作,遍历处理每一行数据
    var map: RDD[(String, Integer)] = orgRdd.map(line => {
      //拿到小时
      var h: String = line.substring(0, 2)
      (h, 1)
    })

    //3、reduce操作，将上面的 map结果按KEY进行合并、叠加
    var reduce: RDD[(String, Integer)] = map.reduceByKey((x, y) => {
      x + y
    })

    //打印出按小时排序后的统计结果
    reduce.sortByKey().collect().map(println)
  }
}